/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2017-2021. All rights reserved.
 * Description: blocking queue
 */
#ifndef INFRA_BASE_BLOCKING_QUEUE_H
#define INFRA_BASE_BLOCKING_QUEUE_H

#include <queue>
#include <cstdint>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <functional>

namespace hiai {
const int DEFAULT_MAX_QUEUE_SIZE = 2048;

template <typename T>
class BlockingQueue {
public:
    explicit BlockingQueue(uint32_t maxSize = DEFAULT_MAX_QUEUE_SIZE) : maxSize_(maxSize), isStopped_(false)
    {
    }

    ~BlockingQueue()
    {
    }

    bool Pop(T& item)
    {
        std::unique_lock<std::mutex> lock(mutex_);
        std::cv_status status = std::cv_status::no_timeout;
        while (queue_.empty() && !isStopped_) {
            status = emptyCond_.wait_for(lock, std::chrono::seconds(1));
            if (status == std::cv_status::timeout) {
                continue;
            }
        }

        if (queue_.empty()) {
            return false;
        }
        item = queue_.front();
        queue_.pop();

        if (isStopped_) {
            return false;
        }
        fullCond_.notify_one();
        return true;
    }

    bool Push(const T& item, bool isWait = true)
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while (queue_.size() >= maxSize_ && !isStopped_) {
            if (!isWait) {
                return false;
            }
            fullCond_.wait(lock);
        }

        if (isStopped_) {
            return false;
        }
        queue_.push(item);
        emptyCond_.notify_one();
        return true;
    }

    bool Remove(std::function<bool(const T&)> pred)
    {
        std::unique_lock<std::mutex> lock(mutex_);

        if (isStopped_) {
            return false;
        }

        bool res = false;
        std::queue<T> tmp;
        while (!queue_.empty()) {
            if (pred(queue_.front())) {
                res = true;
            } else {
                tmp.push(queue_.front());
            }
            queue_.pop();
        }
        queue_.swap(tmp);
        if (res) {
            fullCond_.notify_one();
        }
        return res;
    }

    void Stop()
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            isStopped_ = true;
        }
        fullCond_.notify_all();
        emptyCond_.notify_all();
    }

    void Restart()
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            isStopped_ = false;
        }
    }

    // if the queue is stoped ,need call this function to release the unprocessed items
    std::queue<T> GetRemainItems()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        if (!isStopped_) {
            return std::queue<T>();
        }
        return queue_;
    }

    size_t GetRemainItemsSize()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        return queue_.size();
    }

    bool IsFull()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        return queue_.size() >= maxSize_;
    }

    void Clear()
    {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_ = std::queue<T>();
    }

private:
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable emptyCond_;
    std::condition_variable fullCond_;
    uint32_t maxSize_;
    bool isStopped_;
};
}
#endif // INFRA_BASE_BLOCKING_QUEUE_H
